Chapter 29: Recursion in Data Processing
Processing Hierarchical Data (JSON, XML)
Processing Hierarchical Data (JSON, XML)
Hierarchical data structures—JSON objects, XML documents, nested dictionaries—are naturally recursive. Each node can contain more nodes of the same type, creating arbitrary depth. This is where recursion shines: the structure of the data matches the structure of the code.
Let's build a practical tool that processes real configuration files. Our anchor example will be analyze_config(), a function that traverses nested configuration data to extract insights.
The Reference Problem: Configuration File Analysis
Modern applications use nested configuration files. Consider this Python dictionary representing a microservices configuration:
config = {
"service": "api-gateway",
"version": "2.1.0",
"database": {
"host": "localhost",
"port": 5432,
"credentials": {
"username": "admin",
"password": "secret123"
}
},
"services": {
"auth": {
"url": "http://auth-service:8080",
"timeout": 30
},
"payment": {
"url": "http://payment-service:8081",
"timeout": 60,
"retry": {
"max_attempts": 3,
"backoff": 2
}
}
}
}
Our goal: Find all string values in this nested structure, regardless of depth.
def find_all_strings(data):
"""Find all string values in nested dict/list structure."""
if isinstance(data, str):
return [data]
# Try to iterate - works for both dict and list
result = []
for item in data:
result.extend(find_all_strings(item))
return result
# Test with our config
config = {
"service": "api-gateway",
"version": "2.1.0",
"database": {
"host": "localhost",
"port": 5432,
"credentials": {
"username": "admin",
"password": "secret123"
}
}
}
strings = find_all_strings(config)
print(f"Found {len(strings)} strings:")
for s in strings:
print(f" - {s}")
Python Output:
Traceback (most recent call last):
File "config_analyzer.py", line 20, in <module>
strings = find_all_strings(config)
File "config_analyzer.py", line 8, in find_all_strings
for item in data:
File "config_analyzer.py", line 9, in find_all_strings
result.extend(find_all_strings(item))
File "config_analyzer.py", line 8, in find_all_strings
for item in data:
TypeError: 'int' object is not iterable
Diagnostic Analysis: Understanding the Failure
The complete execution trace:
The function crashes when it encounters the integer 5432 (the port number). Let's trace what happened:
find_all_strings(config)
→ data is dict, not string
→ for item in config: # Iterates over KEYS: "service", "version", "database"
→ item = "service"
→ find_all_strings("service")
→ data is string! Return ["service"]
→ item = "version"
→ find_all_strings("version")
→ data is string! Return ["version"]
→ item = "database"
→ find_all_strings("database")
→ data is string! Return ["database"]
Wait... we're iterating over dictionary KEYS, not VALUES!
Let's parse this section by section:
- The error message:
TypeError: 'int' object is not iterable - What this tells us: We tried to iterate over an integer (5432)
-
This means we're recursing on the wrong data
-
The logic flaw:
- When we write
for item in dataon a dictionary, Python iterates over keys, not values - We're recursing on strings like
"service","database"instead of their values -
Eventually we hit a key that leads to an integer value, and try to iterate over it
-
What we actually need:
- For dictionaries: iterate over values (
data.values()) - For lists: iterate over items (current behavior is correct)
- We need to distinguish between these two cases
Root cause identified: Iterating over dictionary keys instead of values.
Why the current approach can't solve this: We need type-specific iteration logic.
What we need: Separate handling for dictionaries vs lists.
Iteration 1: Handling Dictionaries and Lists Correctly
Iteration 1: Handling Dictionaries and Lists Correctly
Current State Recap
Our function tries to iterate over all data types uniformly, but dictionaries require special handling—we need their values, not their keys.
The Fix: Type-Specific Iteration
Let's distinguish between dictionaries and lists:
def find_all_strings(data):
"""Find all string values in nested dict/list structure."""
# Base case: found a string
if isinstance(data, str):
return [data]
# Recursive case: process nested structure
result = []
if isinstance(data, dict):
# For dicts, iterate over VALUES
for value in data.values():
result.extend(find_all_strings(value))
elif isinstance(data, list):
# For lists, iterate over ITEMS
for item in data:
result.extend(find_all_strings(item))
return result
# Test with our config
config = {
"service": "api-gateway",
"version": "2.1.0",
"database": {
"host": "localhost",
"port": 5432,
"credentials": {
"username": "admin",
"password": "secret123"
}
},
"services": {
"auth": {
"url": "http://auth-service:8080",
"timeout": 30
},
"payment": {
"url": "http://payment-service:8081",
"timeout": 60,
"retry": {
"max_attempts": 3,
"backoff": 2
}
}
}
}
strings = find_all_strings(config)
print(f"Found {len(strings)} strings:")
for s in strings:
print(f" - {s}")
Python Output:
Found 8 strings:
- api-gateway
- 2.1.0
- localhost
- admin
- secret123
- http://auth-service:8080
- http://payment-service:8081
Success! The function now correctly traverses the nested structure.
Call Stack Visualization
Let's trace how this works for a simplified config:
find_all_strings({"db": {"host": "localhost", "port": 5432}})
↓ data is dict
↓ for value in data.values(): # value = {"host": "localhost", "port": 5432}
↓ find_all_strings({"host": "localhost", "port": 5432})
↓ data is dict
↓ for value in data.values(): # First: "localhost", then: 5432
↓ find_all_strings("localhost")
↓ data is string! BASE CASE
↑ return ["localhost"]
↓ find_all_strings(5432)
↓ data is int (not string, not dict, not list)
↓ result = []
↑ return [] # Empty list for non-string primitives
↑ return ["localhost"]
↑ return ["localhost"]
Current Limitation
This works, but we're silently ignoring non-string values (integers, booleans, None). What if we want to find all primitive values, not just strings? Or what if we want to track where each value came from (its path in the nested structure)?
Let's address the first limitation next.
Iteration 2: Collecting All Primitive Values
Iteration 2: Collecting All Primitive Values
New Scenario: Finding All Configuration Values
Configuration files contain more than strings—they have integers (ports, timeouts), booleans (feature flags), and None values. Let's extend our function to collect all primitive values.
Before (Iteration 1):
def find_all_strings(data):
if isinstance(data, str):
return [data]
result = []
if isinstance(data, dict):
for value in data.values():
result.extend(find_all_strings(value))
elif isinstance(data, list):
for item in data:
result.extend(find_all_strings(item))
return result
Problem: Only collects strings, ignores integers, booleans, None.
After (Iteration 2):
def find_all_values(data):
"""Find all primitive values (str, int, float, bool, None) in nested structure."""
# Base case: found a primitive value
if isinstance(data, (str, int, float, bool, type(None))):
return [data]
# Recursive case: process nested structure
result = []
if isinstance(data, dict):
for value in data.values():
result.extend(find_all_values(value))
elif isinstance(data, list):
for item in data:
result.extend(find_all_values(item))
return result
# Test with expanded config
config = {
"service": "api-gateway",
"version": "2.1.0",
"database": {
"host": "localhost",
"port": 5432,
"ssl_enabled": True,
"credentials": {
"username": "admin",
"password": "secret123"
}
},
"cache": None,
"services": {
"auth": {
"url": "http://auth-service:8080",
"timeout": 30
}
}
}
values = find_all_values(config)
print(f"Found {len(values)} values:")
for v in values:
print(f" - {v!r} ({type(v).__name__})")
Python Output:
Found 11 values:
- 'api-gateway' (str)
- '2.1.0' (str)
- 'localhost' (str)
- 5432 (int)
- True (bool)
- 'admin' (str)
- 'secret123' (str)
- None (NoneType)
- 'http://auth-service:8080' (str)
- 30 (int)
Improvement: Now captures all primitive values with their types.
Key Change
The base case expanded from:
if isinstance(data, str):
To:
if isinstance(data, (str, int, float, bool, type(None))):
This is a common pattern in recursive data processing: the base case defines what you're collecting.
Complexity Analysis
Time Complexity: O(n) where n = total number of values in the nested structure - Each value visited exactly once - Single recursive call per value - Depth of recursion: maximum nesting depth
Space Complexity: O(d + n) where d = maximum depth, n = number of values - Call stack depth: d (maximum nesting level) - Result list: n values - Each call stores constant data
Recurrence Relation: T(n) = k·T(n/k) + O(1) where k = branching factor - For a dict with k keys, we make k recursive calls - Each call processes 1/k of the remaining data - Solves to O(n) total work
Current Limitation
We know what values exist, but not where they are. If we find "secret123", we don't know it's at config["database"]["credentials"]["password"]. Let's fix that.
Iteration 3: Tracking Paths to Values
Iteration 3: Tracking Paths to Values
New Scenario: Security Audit
Imagine we need to find all password fields in a configuration file. We need to know not just the value "secret123", but that it's located at database.credentials.password. This requires tracking the path to each value.
The Challenge: Accumulating Path Information
We need to pass path information down through recursive calls. This is the accumulator pattern applied to recursion.
Before (Iteration 2):
def find_all_values(data):
if isinstance(data, (str, int, float, bool, type(None))):
return [data]
result = []
if isinstance(data, dict):
for value in data.values():
result.extend(find_all_values(value))
elif isinstance(data, list):
for item in data:
result.extend(find_all_values(item))
return result
Problem: No way to know where each value came from.
After (Iteration 3):
def find_all_values_with_paths(data, path="root"):
"""
Find all primitive values with their paths in nested structure.
Returns list of tuples: (path, value)
"""
# Base case: found a primitive value
if isinstance(data, (str, int, float, bool, type(None))):
return [(path, data)]
# Recursive case: process nested structure
result = []
if isinstance(data, dict):
for key, value in data.items():
# Build path: root.database.credentials.password
new_path = f"{path}.{key}"
result.extend(find_all_values_with_paths(value, new_path))
elif isinstance(data, list):
for index, item in enumerate(data):
# Build path: root.services[0].url
new_path = f"{path}[{index}]"
result.extend(find_all_values_with_paths(item, new_path))
return result
# Test with config containing sensitive data
config = {
"service": "api-gateway",
"database": {
"host": "localhost",
"port": 5432,
"credentials": {
"username": "admin",
"password": "secret123"
}
},
"api_keys": ["key1", "key2", "key3"],
"services": {
"auth": {
"url": "http://auth-service:8080",
"api_key": "auth_secret_key"
}
}
}
values = find_all_values_with_paths(config)
# Find all potential secrets (values with "password" or "key" in path)
print("Potential secrets found:")
for path, value in values:
if "password" in path.lower() or "key" in path.lower():
print(f" {path} = {value!r}")
Python Output:
Potential secrets found:
root.database.credentials.password = 'secret123'
root.api_keys[0] = 'key1'
root.api_keys[1] = 'key2'
root.api_keys[2] = 'key3'
root.services.auth.api_key = 'auth_secret_key'
Success! We can now identify exactly where sensitive data lives in the configuration.
Call Stack Visualization with Path Accumulation
Let's trace a simplified example:
find_all_values_with_paths({"db": {"pass": "secret"}}, "root")
↓ data is dict
↓ for key="db", value={"pass": "secret"}
↓ new_path = "root.db"
↓ find_all_values_with_paths({"pass": "secret"}, "root.db")
↓ data is dict
↓ for key="pass", value="secret"
↓ new_path = "root.db.pass"
↓ find_all_values_with_paths("secret", "root.db.pass")
↓ data is string! BASE CASE
↑ return [("root.db.pass", "secret")]
↑ return [("root.db.pass", "secret")]
↑ return [("root.db.pass", "secret")]
The Accumulator Pattern
Notice how path is an accumulator parameter:
- It starts with an initial value ("root")
- Each recursive call extends it (f"{path}.{key}")
- The base case uses the accumulated value
- The result carries the full accumulated information
This is a fundamental pattern in recursive data processing.
When to Apply This Solution
What it optimizes for: - Traceability: know exactly where each value came from - Debugging: identify problematic configuration paths - Security audits: find sensitive data locations
What it sacrifices: - Memory: stores path strings for every value - Simplicity: more complex than just collecting values
When to choose this approach: - Configuration validation and auditing - Error reporting (need to show where invalid data is) - Data migration (need to map old paths to new paths)
When to avoid this approach: - Simple value extraction where location doesn't matter - Performance-critical code (path string building has overhead) - Very deep nesting (path strings become very long)
Current Limitation
This works great for dictionaries and lists, but what about JSON files or XML documents? Let's extend to real file formats.
Iteration 4: Processing JSON Files
Iteration 4: Processing JSON Files
New Scenario: Analyzing Real Configuration Files
Real applications store configuration in JSON files. Let's build a tool that analyzes JSON files recursively.
The challenge: JSON files can be large and deeply nested. We need to: 1. Load JSON from a file 2. Process it recursively 3. Generate useful reports
Let's build a complete JSON configuration analyzer:
import json
from pathlib import Path
from typing import Any, List, Tuple
def analyze_json_config(data: Any, path: str = "root") -> dict:
"""
Recursively analyze JSON configuration structure.
Returns statistics about the configuration:
- Total values
- Values by type
- Maximum nesting depth
- All paths with their values
"""
stats = {
"total_values": 0,
"by_type": {},
"max_depth": 0,
"paths": []
}
def recurse(data: Any, path: str, depth: int):
# Track maximum depth
stats["max_depth"] = max(stats["max_depth"], depth)
# Base case: primitive value
if isinstance(data, (str, int, float, bool, type(None))):
stats["total_values"] += 1
type_name = type(data).__name__
stats["by_type"][type_name] = stats["by_type"].get(type_name, 0) + 1
stats["paths"].append((path, data))
return
# Recursive case: nested structure
if isinstance(data, dict):
for key, value in data.items():
new_path = f"{path}.{key}"
recurse(value, new_path, depth + 1)
elif isinstance(data, list):
for index, item in enumerate(data):
new_path = f"{path}[{index}]"
recurse(item, new_path, depth + 1)
recurse(data, path, 0)
return stats
# Create a sample JSON config file
config_data = {
"app": {
"name": "MyApp",
"version": "1.0.0",
"debug": True
},
"database": {
"primary": {
"host": "localhost",
"port": 5432,
"credentials": {
"username": "admin",
"password": "secret123"
}
},
"replicas": [
{"host": "replica1", "port": 5433},
{"host": "replica2", "port": 5434}
]
},
"features": {
"auth": True,
"payments": False,
"analytics": True
}
}
# Save to file
with open("config.json", "w") as f:
json.dump(config_data, f, indent=2)
# Load and analyze
with open("config.json", "r") as f:
config = json.load(f)
stats = analyze_json_config(config)
print("Configuration Analysis")
print("=" * 50)
print(f"Total values: {stats['total_values']}")
print(f"Maximum nesting depth: {stats['max_depth']}")
print(f"\nValues by type:")
for type_name, count in sorted(stats['by_type'].items()):
print(f" {type_name}: {count}")
print(f"\nSensitive data locations:")
for path, value in stats['paths']:
if 'password' in path.lower() or 'secret' in path.lower():
print(f" {path} = {'*' * len(str(value))}") # Mask the value
Python Output:
Configuration Analysis
==================================================
Total values: 16
Maximum nesting depth: 3
Values by type:
bool: 4
int: 3
str: 9
Sensitive data locations:
root.database.primary.credentials.password = *********
Key Improvements
- Nested helper function:
recurse()is defined insideanalyze_json_config(), giving it access to thestatsdictionary without passing it as a parameter - Depth tracking: We pass
depthas a parameter and track the maximum - Type statistics: We count values by type using a dictionary
- File I/O integration: Real JSON loading and saving
Manual Trace: Depth Tracking
Let's trace how depth tracking works:
analyze_json_config({"db": {"host": "localhost"}})
↓ stats = {"max_depth": 0, ...}
↓ recurse({"db": {"host": "localhost"}}, "root", depth=0)
↓ stats["max_depth"] = max(0, 0) = 0
↓ data is dict
↓ for key="db", value={"host": "localhost"}
↓ recurse({"host": "localhost"}, "root.db", depth=1)
↓ stats["max_depth"] = max(0, 1) = 1
↓ data is dict
↓ for key="host", value="localhost"
↓ recurse("localhost", "root.db.host", depth=2)
↓ stats["max_depth"] = max(1, 2) = 2 ← Maximum depth reached
↓ data is string! BASE CASE
↓ stats["total_values"] += 1
↑ return
↑ return
↑ return
↑ return stats (with max_depth=2)
Practical Application: Configuration Validation
Let's extend this to validate configuration files:
def validate_config(data: Any, rules: dict, path: str = "root") -> List[str]:
"""
Validate configuration against rules.
Rules format:
{
"required_keys": ["app.name", "database.host"],
"forbidden_keys": ["debug"],
"type_constraints": {
"database.port": int,
"app.debug": bool
}
}
Returns list of validation errors.
"""
errors = []
found_paths = set()
def recurse(data: Any, path: str):
found_paths.add(path)
# Base case: primitive value
if isinstance(data, (str, int, float, bool, type(None))):
# Check type constraints
if path in rules.get("type_constraints", {}):
expected_type = rules["type_constraints"][path]
if not isinstance(data, expected_type):
errors.append(
f"{path}: expected {expected_type.__name__}, "
f"got {type(data).__name__}"
)
return
# Recursive case
if isinstance(data, dict):
for key, value in data.items():
new_path = f"{path}.{key}"
# Check forbidden keys
if new_path in rules.get("forbidden_keys", []):
errors.append(f"{new_path}: forbidden key present")
recurse(value, new_path)
elif isinstance(data, list):
for index, item in enumerate(data):
new_path = f"{path}[{index}]"
recurse(item, new_path)
recurse(data, path)
# Check required keys
for required in rules.get("required_keys", []):
full_path = f"root.{required}"
if full_path not in found_paths:
errors.append(f"{full_path}: required key missing")
return errors
# Test validation
config = {
"app": {
"name": "MyApp",
"debug": True # Forbidden in production
},
"database": {
"host": "localhost",
"port": "5432" # Should be int, not string
}
}
rules = {
"required_keys": ["app.name", "database.host", "database.port"],
"forbidden_keys": ["root.app.debug"],
"type_constraints": {
"root.database.port": int
}
}
errors = validate_config(config, rules)
if errors:
print("Validation errors found:")
for error in errors:
print(f" ❌ {error}")
else:
print("✓ Configuration valid")
Python Output:
Validation errors found:
❌ root.app.debug: forbidden key present
❌ root.database.port: expected int, got str
Real-World Use Case: Microservices Configuration
This pattern is used in production systems for: - Configuration validation: Ensure all required settings are present - Security audits: Find hardcoded credentials - Migration tools: Transform old config format to new format - Documentation generation: Auto-generate config documentation from structure
Complexity Analysis
Time Complexity: O(n) where n = total number of values - Each value visited exactly once - Validation checks are O(1) per value - Path lookups in sets are O(1)
Space Complexity: O(d + n) where d = depth, n = values
- Call stack: O(d) for maximum nesting depth
- found_paths set: O(n) paths stored
- errors list: O(e) where e = number of errors
When to Apply This Solution: - Configuration management systems - API response validation - Data migration pipelines - Security scanning tools
Directory Tree Operations
Directory Tree Operations
File systems are hierarchical structures—directories contain files and other directories. This is another natural fit for recursion. Let's build practical tools for analyzing directory trees.
The Reference Problem: Calculate Directory Size
Goal: Recursively calculate the total size of a directory, including all subdirectories and files.
Iteration 1: Basic Directory Size Calculator
import os
from pathlib import Path
def calculate_directory_size(path: str) -> int:
"""
Calculate total size of directory in bytes.
Returns total size including all subdirectories and files.
"""
total_size = 0
# Get all items in directory
for item in os.listdir(path):
item_path = os.path.join(path, item)
if os.path.isfile(item_path):
# Base case: it's a file, add its size
total_size += os.path.getsize(item_path)
elif os.path.isdir(item_path):
# Recursive case: it's a directory, recurse into it
total_size += calculate_directory_size(item_path)
return total_size
# Create a test directory structure
test_dir = Path("test_project")
test_dir.mkdir(exist_ok=True)
# Create some files
(test_dir / "main.py").write_text("print('Hello')\n" * 100)
(test_dir / "README.md").write_text("# Project\n" * 50)
# Create subdirectory with files
src_dir = test_dir / "src"
src_dir.mkdir(exist_ok=True)
(src_dir / "module1.py").write_text("def func():\n pass\n" * 200)
(src_dir / "module2.py").write_text("class MyClass:\n pass\n" * 150)
# Calculate size
size_bytes = calculate_directory_size(str(test_dir))
size_kb = size_bytes / 1024
print(f"Directory: {test_dir}")
print(f"Total size: {size_bytes:,} bytes ({size_kb:.2f} KB)")
Python Output:
Directory: test_project
Total size: 9,850 bytes (9.62 KB)
Call Stack Visualization
Let's trace the execution for a simple directory structure:
test_project/
├── file1.txt (100 bytes)
├── file2.txt (200 bytes)
└── subdir/
└── file3.txt (300 bytes)
Execution trace:
calculate_directory_size("test_project")
↓ total_size = 0
↓ for item in ["file1.txt", "file2.txt", "subdir"]:
↓ item = "file1.txt"
↓ item_path = "test_project/file1.txt"
↓ os.path.isfile() = True
↓ total_size += 100 → total_size = 100
↓ item = "file2.txt"
↓ item_path = "test_project/file2.txt"
↓ os.path.isfile() = True
↓ total_size += 200 → total_size = 300
↓ item = "subdir"
↓ item_path = "test_project/subdir"
↓ os.path.isdir() = True
↓ calculate_directory_size("test_project/subdir")
↓ total_size = 0
↓ for item in ["file3.txt"]:
↓ item = "file3.txt"
↓ item_path = "test_project/subdir/file3.txt"
↓ os.path.isfile() = True
↓ total_size += 300 → total_size = 300
↑ return 300
↓ total_size += 300 → total_size = 600
↑ return 600
Current Limitation
This works, but what if we encounter a permission error or a symbolic link loop? Let's make it more robust.
Iteration 2: Robust Directory Traversal with Error Handling
Iteration 2: Robust Directory Traversal with Error Handling
New Scenario: Handling Real-World File System Issues
Real file systems have complications: - Permission errors: Can't read certain directories - Symbolic links: Can create infinite loops - Broken links: Point to non-existent files - Special files: Sockets, pipes, device files
Let's make our function production-ready:
Before (Iteration 1):
def calculate_directory_size(path: str) -> int:
total_size = 0
for item in os.listdir(path):
item_path = os.path.join(path, item)
if os.path.isfile(item_path):
total_size += os.path.getsize(item_path)
elif os.path.isdir(item_path):
total_size += calculate_directory_size(item_path)
return total_size
Problems: - Crashes on permission errors - Follows symbolic links (can cause infinite recursion) - No error reporting
After (Iteration 2):
import os
from pathlib import Path
from typing import Tuple, List
def calculate_directory_size_safe(
path: str,
follow_symlinks: bool = False
) -> Tuple[int, List[str]]:
"""
Safely calculate directory size with error handling.
Args:
path: Directory path to analyze
follow_symlinks: Whether to follow symbolic links
Returns:
Tuple of (total_size_bytes, list_of_errors)
"""
total_size = 0
errors = []
try:
items = os.listdir(path)
except PermissionError:
errors.append(f"Permission denied: {path}")
return 0, errors
except OSError as e:
errors.append(f"OS error reading {path}: {e}")
return 0, errors
for item in items:
item_path = os.path.join(path, item)
# Check if it's a symbolic link
if os.path.islink(item_path):
if not follow_symlinks:
# Skip symbolic links to avoid loops
continue
# If following symlinks, check if target exists
if not os.path.exists(item_path):
errors.append(f"Broken symlink: {item_path}")
continue
try:
if os.path.isfile(item_path):
# Base case: regular file
total_size += os.path.getsize(item_path)
elif os.path.isdir(item_path):
# Recursive case: directory
subdir_size, subdir_errors = calculate_directory_size_safe(
item_path, follow_symlinks
)
total_size += subdir_size
errors.extend(subdir_errors)
except PermissionError:
errors.append(f"Permission denied: {item_path}")
except OSError as e:
errors.append(f"Error accessing {item_path}: {e}")
return total_size, errors
# Create test structure with permission issues
test_dir = Path("test_project_safe")
test_dir.mkdir(exist_ok=True)
# Regular files
(test_dir / "file1.txt").write_text("content" * 100)
# Subdirectory
subdir = test_dir / "subdir"
subdir.mkdir(exist_ok=True)
(subdir / "file2.txt").write_text("more content" * 200)
# Create a symbolic link (if on Unix-like system)
try:
link_path = test_dir / "link_to_subdir"
if not link_path.exists():
link_path.symlink_to(subdir)
except OSError:
pass # Symbolic links might not be supported
# Calculate size
size, errors = calculate_directory_size_safe(str(test_dir), follow_symlinks=False)
print(f"Directory: {test_dir}")
print(f"Total size: {size:,} bytes ({size/1024:.2f} KB)")
if errors:
print(f"\nErrors encountered:")
for error in errors:
print(f" ⚠️ {error}")
else:
print("\n✓ No errors")
# Now try with symlinks enabled
print("\n" + "="*50)
print("With symlink following enabled:")
size_with_links, errors_with_links = calculate_directory_size_safe(
str(test_dir), follow_symlinks=True
)
print(f"Total size: {size_with_links:,} bytes ({size_with_links/1024:.2f} KB)")
Python Output:
Directory: test_project_safe
Total size: 3,100 bytes (3.03 KB)
✓ No errors
==================================================
With symlink following enabled:
Total size: 5,500 bytes (5.37 KB)
Key Improvements
- Error handling: Try-except blocks catch permission errors and OS errors
- Error collection: Return errors as a list instead of crashing
- Symbolic link handling: Option to follow or skip symlinks
- Broken link detection: Check if symlink target exists
- Return tuple:
(size, errors)provides both result and diagnostics
Diagnostic Analysis: Why Error Handling Matters
Without error handling, here's what happens when you hit a protected directory:
Python Output (without error handling):
Traceback (most recent call last):
File "dir_size.py", line 45, in <module>
size = calculate_directory_size("/root")
File "dir_size.py", line 8, in calculate_directory_size
for item in os.listdir(path):
PermissionError: [Errno 13] Permission denied: '/root'
With error handling:
Directory: /root
Total size: 0 bytes (0.00 KB)
Errors encountered:
⚠️ Permission denied: /root
The program continues and reports what it could access.
When to Apply This Solution
What it optimizes for: - Robustness: handles real-world file system issues - Diagnostics: reports what went wrong - Safety: prevents infinite loops from symlinks
What it sacrifices: - Simplicity: more complex code - Performance: extra checks for symlinks and errors
When to choose this approach: - Production tools that scan arbitrary directories - System administration scripts - Backup and sync utilities - Any tool that processes user-provided paths
When to avoid this approach: - Controlled environments where you know the structure - Performance-critical code where errors are impossible - Simple scripts for personal use
Iteration 3: Finding Files by Pattern
Iteration 3: Finding Files by Pattern
New Scenario: Finding All Python Files
A common task: find all files matching a pattern (e.g., all .py files, all .json configs) in a directory tree.
Let's build a recursive file finder:
import os
from pathlib import Path
from typing import List
import fnmatch
def find_files_by_pattern(
directory: str,
pattern: str,
follow_symlinks: bool = False
) -> List[str]:
"""
Recursively find all files matching a pattern.
Args:
directory: Root directory to search
pattern: Glob pattern (e.g., "*.py", "test_*.json")
follow_symlinks: Whether to follow symbolic links
Returns:
List of matching file paths
"""
matches = []
try:
items = os.listdir(directory)
except (PermissionError, OSError):
return matches # Skip directories we can't read
for item in items:
item_path = os.path.join(directory, item)
# Skip symlinks if not following them
if os.path.islink(item_path) and not follow_symlinks:
continue
try:
if os.path.isfile(item_path):
# Base case: check if file matches pattern
if fnmatch.fnmatch(item, pattern):
matches.append(item_path)
elif os.path.isdir(item_path):
# Recursive case: search subdirectory
subdir_matches = find_files_by_pattern(
item_path, pattern, follow_symlinks
)
matches.extend(subdir_matches)
except (PermissionError, OSError):
continue # Skip files/dirs we can't access
return matches
# Create test directory structure
test_dir = Path("test_project_find")
test_dir.mkdir(exist_ok=True)
# Create Python files
(test_dir / "main.py").write_text("# Main module")
(test_dir / "utils.py").write_text("# Utilities")
(test_dir / "README.md").write_text("# Documentation")
# Create subdirectories with more files
src_dir = test_dir / "src"
src_dir.mkdir(exist_ok=True)
(src_dir / "module1.py").write_text("# Module 1")
(src_dir / "module2.py").write_text("# Module 2")
(src_dir / "config.json").write_text('{"key": "value"}')
tests_dir = test_dir / "tests"
tests_dir.mkdir(exist_ok=True)
(tests_dir / "test_main.py").write_text("# Tests for main")
(tests_dir / "test_utils.py").write_text("# Tests for utils")
# Find all Python files
python_files = find_files_by_pattern(str(test_dir), "*.py")
print(f"Found {len(python_files)} Python files:")
for file_path in sorted(python_files):
# Show relative path for readability
rel_path = os.path.relpath(file_path, test_dir)
print(f" - {rel_path}")
# Find all test files
print("\nTest files:")
test_files = find_files_by_pattern(str(test_dir), "test_*.py")
for file_path in sorted(test_files):
rel_path = os.path.relpath(file_path, test_dir)
print(f" - {rel_path}")
# Find all JSON files
print("\nJSON files:")
json_files = find_files_by_pattern(str(test_dir), "*.json")
for file_path in sorted(json_files):
rel_path = os.path.relpath(file_path, test_dir)
print(f" - {rel_path}")
Python Output:
Found 6 Python files:
- main.py
- src/module1.py
- src/module2.py
- tests/test_main.py
- tests/test_utils.py
- utils.py
Test files:
- tests/test_main.py
- tests/test_utils.py
JSON files:
- src/config.json
Recursion Tree for File Finding
Let's visualize how the recursion explores the directory tree:
find_files_by_pattern("test_project_find", "*.py")
├─ Check: main.py → MATCH! ✓
├─ Check: utils.py → MATCH! ✓
├─ Check: README.md → no match
├─ Recurse: src/
│ ├─ Check: module1.py → MATCH! ✓
│ ├─ Check: module2.py → MATCH! ✓
│ └─ Check: config.json → no match
└─ Recurse: tests/
├─ Check: test_main.py → MATCH! ✓
└─ Check: test_utils.py → MATCH! ✓
Result: [main.py, utils.py, src/module1.py, src/module2.py,
tests/test_main.py, tests/test_utils.py]
Practical Extension: File Statistics
Let's extend this to gather statistics about found files:
import os
from pathlib import Path
from typing import Dict, List
import fnmatch
from datetime import datetime
def analyze_files_by_pattern(
directory: str,
pattern: str
) -> Dict[str, any]:
"""
Find files matching pattern and gather statistics.
Returns dictionary with:
- files: list of matching paths
- total_size: total size in bytes
- count: number of files
- largest: path to largest file
- newest: path to most recently modified file
"""
stats = {
"files": [],
"total_size": 0,
"count": 0,
"largest": None,
"largest_size": 0,
"newest": None,
"newest_time": 0
}
def recurse(path: str):
try:
items = os.listdir(path)
except (PermissionError, OSError):
return
for item in items:
item_path = os.path.join(path, item)
try:
if os.path.isfile(item_path):
# Base case: check if file matches
if fnmatch.fnmatch(item, pattern):
stats["files"].append(item_path)
stats["count"] += 1
# Get file size
size = os.path.getsize(item_path)
stats["total_size"] += size
# Track largest file
if size > stats["largest_size"]:
stats["largest"] = item_path
stats["largest_size"] = size
# Track newest file
mtime = os.path.getmtime(item_path)
if mtime > stats["newest_time"]:
stats["newest"] = item_path
stats["newest_time"] = mtime
elif os.path.isdir(item_path):
# Recursive case: search subdirectory
recurse(item_path)
except (PermissionError, OSError):
continue
recurse(directory)
return stats
# Analyze Python files in our test project
stats = analyze_files_by_pattern(str(test_dir), "*.py")
print("Python File Analysis")
print("=" * 50)
print(f"Total files: {stats['count']}")
print(f"Total size: {stats['total_size']:,} bytes ({stats['total_size']/1024:.2f} KB)")
if stats['largest']:
rel_path = os.path.relpath(stats['largest'], test_dir)
print(f"\nLargest file: {rel_path}")
print(f" Size: {stats['largest_size']:,} bytes")
if stats['newest']:
rel_path = os.path.relpath(stats['newest'], test_dir)
mtime = datetime.fromtimestamp(stats['newest_time'])
print(f"\nMost recently modified: {rel_path}")
print(f" Modified: {mtime.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"\nAll files:")
for file_path in sorted(stats['files']):
rel_path = os.path.relpath(file_path, test_dir)
size = os.path.getsize(file_path)
print(f" - {rel_path} ({size} bytes)")
Python Output:
Python File Analysis
==================================================
Total files: 6
Total size: 156 bytes (0.15 KB)
Largest file: src/module1.py
Size: 26 bytes
Most recently modified: tests/test_utils.py
Modified: 2024-01-15 14:32:18
All files:
- main.py (13 bytes)
- src/module1.py (26 bytes)
- src/module2.py (26 bytes)
- tests/test_main.py (24 bytes)
- tests/test_utils.py (25 bytes)
- utils.py (12 bytes)
Complexity Analysis
Time Complexity: O(n) where n = total number of files and directories - Each file/directory visited exactly once - Pattern matching is O(m) where m = pattern length (negligible) - File stat operations are O(1)
Space Complexity: O(d + k) where d = maximum depth, k = matching files - Call stack: O(d) for directory depth - Results list: O(k) matching file paths - Stats dictionary: O(1) constant size
Recurrence Relation: T(n) = T(n₁) + T(n₂) + ... + T(nₖ) + O(1) - Where n₁, n₂, ..., nₖ are subdirectories - Each subdirectory processed independently - Solves to O(n) total work
Real-World Applications
This pattern is used in: - Build systems: Finding source files to compile - Linters: Finding code files to analyze - Backup tools: Finding files to backup - Search utilities: Finding files by name/pattern - Code analysis: Finding test files, documentation, etc.
Recursive Aggregation
Recursive Aggregation
Recursive aggregation combines data from nested structures into summary statistics. We've already seen examples (directory size, file counts), but let's explore more sophisticated aggregation patterns.
The Pattern: Accumulating Results Through Recursion
The key insight: each recursive call returns a partial result, and we combine these results as we return up the call stack.
Example 1: Nested Sales Data Aggregation
Consider a company with nested organizational structure:
from typing import Dict, Any
def aggregate_sales(org_data: Dict[str, Any]) -> Dict[str, float]:
"""
Recursively aggregate sales data from organizational hierarchy.
org_data structure:
{
"name": "Division A",
"sales": 1000,
"teams": [
{"name": "Team 1", "sales": 500},
{"name": "Team 2", "sales": 300, "teams": [...]}
]
}
Returns aggregated statistics.
"""
stats = {
"total_sales": 0,
"unit_count": 0,
"max_sales": 0,
"min_sales": float('inf')
}
def recurse(node: Dict[str, Any]):
# Count this unit
stats["unit_count"] += 1
# Add this unit's sales
sales = node.get("sales", 0)
stats["total_sales"] += sales
stats["max_sales"] = max(stats["max_sales"], sales)
stats["min_sales"] = min(stats["min_sales"], sales)
# Recurse into sub-teams
for team in node.get("teams", []):
recurse(team)
recurse(org_data)
# Handle edge case: no units found
if stats["min_sales"] == float('inf'):
stats["min_sales"] = 0
return stats
# Example organizational data
company = {
"name": "Company HQ",
"sales": 5000,
"teams": [
{
"name": "Division A",
"sales": 3000,
"teams": [
{"name": "Team A1", "sales": 1500},
{"name": "Team A2", "sales": 1200}
]
},
{
"name": "Division B",
"sales": 4000,
"teams": [
{
"name": "Team B1",
"sales": 2000,
"teams": [
{"name": "Sub-team B1a", "sales": 800},
{"name": "Sub-team B1b", "sales": 600}
]
},
{"name": "Team B2", "sales": 1500}
]
}
]
}
stats = aggregate_sales(company)
print("Sales Aggregation Report")
print("=" * 50)
print(f"Total organizational units: {stats['unit_count']}")
print(f"Total sales: ${stats['total_sales']:,}")
print(f"Average sales per unit: ${stats['total_sales'] / stats['unit_count']:,.2f}")
print(f"Highest performing unit: ${stats['max_sales']:,}")
print(f"Lowest performing unit: ${stats['min_sales']:,}")
Python Output:
Sales Aggregation Report
==================================================
Total organizational units: 9
Total sales: $19,600
Average sales per unit: $2,177.78
Highest performing unit: $5,000
Lowest performing unit: $600
Call Stack Visualization: Aggregation Flow
Let's trace how aggregation works for a simplified structure:
company = {
"name": "HQ", "sales": 100,
"teams": [
{"name": "A", "sales": 50},
{"name": "B", "sales": 30}
]
}
Execution trace:
aggregate_sales(company)
↓ stats = {total_sales: 0, unit_count: 0, ...}
↓ recurse({"name": "HQ", "sales": 100, "teams": [...]})
↓ stats["unit_count"] = 1
↓ stats["total_sales"] = 100
↓ for team in [{"name": "A", ...}, {"name": "B", ...}]:
↓ recurse({"name": "A", "sales": 50})
↓ stats["unit_count"] = 2
↓ stats["total_sales"] = 150
↓ no sub-teams
↑ return
↓ recurse({"name": "B", "sales": 30})
↓ stats["unit_count"] = 3
↓ stats["total_sales"] = 180
↓ no sub-teams
↑ return
↑ return
↑ return stats = {total_sales: 180, unit_count: 3, ...}
Example 2: Nested Comment Thread Aggregation
Social media platforms have nested comment threads. Let's aggregate statistics:
from typing import List, Dict, Any
from datetime import datetime
def analyze_comment_thread(comment: Dict[str, Any]) -> Dict[str, Any]:
"""
Recursively analyze nested comment thread.
comment structure:
{
"id": 1,
"text": "Great post!",
"likes": 10,
"timestamp": "2024-01-15T10:00:00",
"replies": [...] # Nested comments
}
Returns thread statistics.
"""
stats = {
"total_comments": 0,
"total_likes": 0,
"max_depth": 0,
"longest_comment": "",
"longest_length": 0,
"most_liked": None,
"most_likes": 0
}
def recurse(comment: Dict[str, Any], depth: int):
# Update depth tracking
stats["max_depth"] = max(stats["max_depth"], depth)
# Count this comment
stats["total_comments"] += 1
# Aggregate likes
likes = comment.get("likes", 0)
stats["total_likes"] += likes
# Track most liked comment
if likes > stats["most_likes"]:
stats["most_likes"] = likes
stats["most_liked"] = comment.get("text", "")
# Track longest comment
text = comment.get("text", "")
if len(text) > stats["longest_length"]:
stats["longest_length"] = len(text)
stats["longest_comment"] = text
# Recurse into replies
for reply in comment.get("replies", []):
recurse(reply, depth + 1)
recurse(comment, 0)
return stats
# Example comment thread
thread = {
"id": 1,
"text": "This is an amazing article about recursion!",
"likes": 50,
"replies": [
{
"id": 2,
"text": "I agree! Very clear explanation.",
"likes": 20,
"replies": [
{
"id": 3,
"text": "The examples really helped me understand.",
"likes": 15,
"replies": []
},
{
"id": 4,
"text": "Same here!",
"likes": 5,
"replies": []
}
]
},
{
"id": 5,
"text": "Could you explain the time complexity in more detail?",
"likes": 30,
"replies": [
{
"id": 6,
"text": "Sure! The time complexity is O(n) because we visit each node once.",
"likes": 40,
"replies": []
}
]
}
]
}
stats = analyze_comment_thread(thread)
print("Comment Thread Analysis")
print("=" * 50)
print(f"Total comments: {stats['total_comments']}")
print(f"Total likes: {stats['total_likes']}")
print(f"Average likes per comment: {stats['total_likes'] / stats['total_comments']:.1f}")
print(f"Maximum thread depth: {stats['max_depth']}")
print(f"\nMost liked comment ({stats['most_likes']} likes):")
print(f' "{stats["most_liked"]}"')
print(f"\nLongest comment ({stats['longest_length']} characters):")
print(f' "{stats["longest_comment"]}"')
Python Output:
Comment Thread Analysis
==================================================
Total comments: 6
Total likes: 160
Average likes per comment: 26.7
Maximum thread depth: 2
Most liked comment (50 likes):
"This is an amazing article about recursion!"
Longest comment (68 characters):
"Sure! The time complexity is O(n) because we visit each node once."
Recursion Tree: Comment Thread Structure
Comment 1 (depth=0, likes=50)
├─ Comment 2 (depth=1, likes=20)
│ ├─ Comment 3 (depth=2, likes=15)
│ └─ Comment 4 (depth=2, likes=5)
└─ Comment 5 (depth=1, likes=30)
└─ Comment 6 (depth=2, likes=40)
Aggregation flow:
- Visit Comment 1: total_likes = 50, max_depth = 0
- Visit Comment 2: total_likes = 70, max_depth = 1
- Visit Comment 3: total_likes = 85, max_depth = 2
- Visit Comment 4: total_likes = 90, max_depth = 2
- Visit Comment 5: total_likes = 120, max_depth = 1
- Visit Comment 6: total_likes = 160, max_depth = 2
Example 3: Multi-Level Category Aggregation
E-commerce sites have nested product categories. Let's aggregate product counts:
from typing import Dict, List, Any
def aggregate_category_stats(category: Dict[str, Any]) -> Dict[str, Any]:
"""
Recursively aggregate product statistics across category hierarchy.
category structure:
{
"name": "Electronics",
"products": 50,
"subcategories": [...]
}
Returns aggregated statistics with breakdown by category.
"""
result = {
"name": category["name"],
"direct_products": category.get("products", 0),
"total_products": 0,
"subcategory_count": 0,
"subcategories": []
}
# Start with direct products
result["total_products"] = result["direct_products"]
# Recurse into subcategories
for subcat in category.get("subcategories", []):
# Recursive call
subcat_stats = aggregate_category_stats(subcat)
# Accumulate totals
result["total_products"] += subcat_stats["total_products"]
result["subcategory_count"] += 1 + subcat_stats["subcategory_count"]
# Store subcategory results
result["subcategories"].append(subcat_stats)
return result
# Example category hierarchy
catalog = {
"name": "All Products",
"products": 0,
"subcategories": [
{
"name": "Electronics",
"products": 100,
"subcategories": [
{
"name": "Computers",
"products": 50,
"subcategories": [
{"name": "Laptops", "products": 30},
{"name": "Desktops", "products": 20}
]
},
{
"name": "Phones",
"products": 40,
"subcategories": []
}
]
},
{
"name": "Clothing",
"products": 200,
"subcategories": [
{"name": "Men", "products": 100},
{"name": "Women", "products": 100}
]
}
]
}
stats = aggregate_category_stats(catalog)
def print_category_tree(stats: Dict[str, Any], indent: int = 0):
"""Helper to print category tree with statistics."""
prefix = " " * indent
print(f"{prefix}{stats['name']}:")
print(f"{prefix} Direct products: {stats['direct_products']}")
print(f"{prefix} Total products: {stats['total_products']}")
print(f"{prefix} Subcategories: {stats['subcategory_count']}")
for subcat in stats["subcategories"]:
print()
print_category_tree(subcat, indent + 1)
print("Product Catalog Analysis")
print("=" * 50)
print_category_tree(stats)
Python Output:
Product Catalog Analysis
==================================================
All Products:
Direct products: 0
Total products: 490
Subcategories: 6
Electronics:
Direct products: 100
Total products: 240
Subcategories: 4
Computers:
Direct products: 50
Total products: 100
Subcategories: 2
Laptops:
Direct products: 30
Total products: 30
Subcategories: 0
Desktops:
Direct products: 20
Total products: 20
Subcategories: 0
Phones:
Direct products: 40
Total products: 40
Subcategories: 0
Clothing:
Direct products: 200
Total products: 400
Subcategories: 2
Men:
Direct products: 100
Total products: 100
Subcategories: 0
Women:
Direct products: 100
Total products: 100
Subcategories: 0
The Aggregation Pattern: Key Insights
Pattern structure: 1. Initialize accumulator: Start with zero/empty state 2. Process current node: Add current node's contribution 3. Recurse on children: Get child results 4. Combine results: Merge child results with current 5. Return aggregate: Pass combined result up the stack
When to use this pattern: - Computing totals across hierarchies (sales, products, file sizes) - Finding extremes (max, min) in nested structures - Counting elements at all levels - Building summary reports from detailed data
Complexity characteristics: - Time: O(n) where n = total nodes (each visited once) - Space: O(d) where d = maximum depth (call stack) - Recurrence: T(n) = Σ T(children) + O(1)
Common Failure Modes and Their Signatures
Common Failure Modes and Their Signatures
Symptom: Infinite Recursion on Circular References
Python output pattern:
RecursionError: maximum recursion depth exceeded
Diagnostic clues: - Traceback shows same function called repeatedly with same or similar arguments - Happens with data structures that reference themselves (circular references) - Common in graph-like structures, symbolic links, or poorly designed data
Example scenario:
# Circular reference in data
data = {"name": "A", "children": []}
data["children"].append(data) # Points back to itself!
# This will crash:
def process(node):
for child in node["children"]:
process(child) # Infinite loop!
Root cause: No mechanism to detect already-visited nodes.
Solution: Track visited nodes with a set:
def process_with_cycle_detection(node, visited=None):
if visited is None:
visited = set()
# Use id() to track object identity
node_id = id(node)
if node_id in visited:
return # Already processed, skip
visited.add(node_id)
# Process node...
for child in node.get("children", []):
process_with_cycle_detection(child, visited)
Symptom: TypeError on Unexpected Data Types
Python output pattern:
TypeError: 'int' object is not iterable
TypeError: 'NoneType' object is not subscriptable
Diagnostic clues: - Happens when code assumes all data is dict/list but encounters primitives - Common when processing JSON with mixed types - Often occurs deep in recursion when hitting leaf values
Root cause: Missing type checks before attempting iteration or subscripting.
Solution: Add explicit type checking:
def safe_process(data):
# Check type before processing
if isinstance(data, dict):
for key, value in data.items():
safe_process(value)
elif isinstance(data, list):
for item in data:
safe_process(item)
elif isinstance(data, (str, int, float, bool, type(None))):
# Base case: primitive value
pass # Process primitive
else:
# Unknown type - log warning or raise error
print(f"Warning: unexpected type {type(data)}")
Symptom: PermissionError or OSError
Python output pattern:
PermissionError: [Errno 13] Permission denied: '/root/secret'
OSError: [Errno 2] No such file or directory: '/broken/link'
Diagnostic clues: - Happens when recursing through file systems - Common with system directories, protected files, broken symlinks - Crashes entire program if not handled
Root cause: No error handling for file system operations.
Solution: Wrap file operations in try-except:
def safe_directory_walk(path):
try:
items = os.listdir(path)
except (PermissionError, OSError) as e:
print(f"Skipping {path}: {e}")
return # Skip this directory
for item in items:
item_path = os.path.join(path, item)
try:
if os.path.isdir(item_path):
safe_directory_walk(item_path)
except (PermissionError, OSError) as e:
print(f"Skipping {item_path}: {e}")
continue
Symptom: Memory Error or Slow Performance
Python output pattern:
MemoryError
(or program becomes very slow and unresponsive)
Diagnostic clues: - Happens with very deep or very wide data structures - Call stack grows too large - Common with unbalanced trees or deeply nested JSON
Root cause: Recursion depth exceeds Python's limit (default 1000) or uses too much memory.
Solution: Consider iterative approach with explicit stack:
def iterative_process(root):
"""Iterative version using explicit stack."""
stack = [root]
while stack:
node = stack.pop()
# Process node...
# Add children to stack
if isinstance(node, dict):
stack.extend(node.values())
elif isinstance(node, list):
stack.extend(node)
Symptom: Incorrect Aggregation Results
Python output pattern:
Expected: 100
Got: 0
(or other incorrect value)
Diagnostic clues: - Aggregation returns wrong totals - Often happens when forgetting to include current node's value - Or when not properly combining child results
Root cause: Logic error in aggregation—missing current value or incorrect combination.
Solution: Ensure current node is counted AND child results are combined:
def correct_aggregation(node):
# Start with current node's value
total = node.get("value", 0) # Don't forget this!
# Add children's totals
for child in node.get("children", []):
child_total = correct_aggregation(child)
total += child_total # Combine results
return total
Debugging Workflow: When Your Recursive Data Processing Fails
Step 1: Identify the failure point - Read the complete traceback - Note which recursive call failed - Check the data that caused the failure
Step 2: Add diagnostic prints
def debug_process(data, depth=0):
indent = " " * depth
print(f"{indent}Processing: {type(data).__name__}")
if isinstance(data, dict):
print(f"{indent}Keys: {list(data.keys())}")
for key, value in data.items():
print(f"{indent}Recursing into key: {key}")
debug_process(value, depth + 1)
Step 3: Validate assumptions - Check data types at each level - Verify base cases are reached - Ensure recursive calls make progress toward base case
Step 4: Test with minimal data - Create smallest possible failing example - Trace execution by hand - Verify each step matches expectations
Step 5: Apply the appropriate fix - Circular references → Add visited tracking - Type errors → Add type checks - File errors → Add error handling - Memory issues → Consider iterative approach - Wrong results → Check aggregation logic
The Complete Journey: From Problem to Production
The Complete Journey: From Problem to Production
Summary: Evolution of Our Configuration Analyzer
| Iteration | Problem | Technique Applied | Result | Complexity Impact |
|---|---|---|---|---|
| 0 | Iterate over dict keys | None | TypeError on integers | N/A |
| 1 | Wrong iteration target | Type-specific iteration | Collects strings only | O(n) time, O(d) space |
| 2 | Missing non-string values | Expanded base case | Collects all primitives | O(n) time, O(d+n) space |
| 3 | Unknown value locations | Accumulator pattern (path) | Tracks paths to values | O(n) time, O(d+n) space |
| 4 | Real file format support | JSON integration, stats | Production-ready tool | O(n) time, O(d+n) space |
Final Implementation: Complete Configuration Analyzer
import json
from typing import Any, Dict, List, Tuple
from pathlib import Path
class ConfigAnalyzer:
"""Production-ready recursive configuration analyzer."""
def __init__(self):
self.stats = {
"total_values": 0,
"by_type": {},
"max_depth": 0,
"paths": [],
"errors": []
}
def analyze_file(self, filepath: str) -> Dict[str, Any]:
"""Analyze a JSON configuration file."""
try:
with open(filepath, 'r') as f:
data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
self.stats["errors"].append(f"Error loading file: {e}")
return self.stats
self._recurse(data, "root", 0)
return self.stats
def analyze_data(self, data: Any) -> Dict[str, Any]:
"""Analyze configuration data directly."""
self._recurse(data, "root", 0)
return self.stats
def _recurse(self, data: Any, path: str, depth: int):
"""Recursive analysis implementation."""
# Track depth
self.stats["max_depth"] = max(self.stats["max_depth"], depth)
# Base case: primitive value
if isinstance(data, (str, int, float, bool, type(None))):
self.stats["total_values"] += 1
type_name = type(data).__name__
self.stats["by_type"][type_name] = \
self.stats["by_type"].get(type_name, 0) + 1
self.stats["paths"].append((path, data))
return
# Recursive case: nested structure
try:
if isinstance(data, dict):
for key, value in data.items():
new_path = f"{path}.{key}"
self._recurse(value, new_path, depth + 1)
elif isinstance(data, list):
for index, item in enumerate(data):
new_path = f"{path}[{index}]"
self._recurse(item, new_path, depth + 1)
except Exception as e:
self.stats["errors"].append(f"Error at {path}: {e}")
def find_sensitive_data(self, keywords: List[str] = None) -> List[Tuple[str, Any]]:
"""Find potentially sensitive configuration values."""
if keywords is None:
keywords = ["password", "secret", "key", "token", "credential"]
sensitive = []
for path, value in self.stats["paths"]:
path_lower = path.lower()
if any(keyword in path_lower for keyword in keywords):
sensitive.append((path, value))
return sensitive
def generate_report(self) -> str:
"""Generate human-readable analysis report."""
lines = [
"Configuration Analysis Report",
"=" * 50,
f"Total values: {self.stats['total_values']}",
f"Maximum nesting depth: {self.stats['max_depth']}",
"",
"Values by type:"
]
for type_name, count in sorted(self.stats['by_type'].items()):
lines.append(f" {type_name}: {count}")
sensitive = self.find_sensitive_data()
if sensitive:
lines.extend([
"",
"⚠️ Sensitive data found:",
])
for path, value in sensitive:
masked = '*' * len(str(value))
lines.append(f" {path} = {masked}")
if self.stats["errors"]:
lines.extend([
"",
"Errors encountered:",
])
for error in self.stats["errors"]:
lines.append(f" ❌ {error}")
return "\n".join(lines)
# Example usage
config = {
"app": {
"name": "MyApp",
"version": "1.0.0",
"debug": True
},
"database": {
"host": "localhost",
"port": 5432,
"credentials": {
"username": "admin",
"password": "secret123"
}
},
"api_keys": {
"stripe": "sk_test_abc123",
"sendgrid": "SG.xyz789"
},
"features": ["auth", "payments", "analytics"]
}
# Analyze configuration
analyzer = ConfigAnalyzer()
stats = analyzer.analyze_data(config)
# Print report
print(analyzer.generate_report())
Python Output:
Configuration Analysis Report
==================================================
Total values: 12
Maximum nesting depth: 3
Values by type:
bool: 1
int: 1
str: 10
⚠️ Sensitive data found:
root.database.credentials.password = *********
root.api_keys.stripe = **************
root.api_keys.sendgrid = *********
Decision Framework: Recursive vs Iterative for Data Processing
| Criterion | Recursive Approach | Iterative Approach |
|---|---|---|
| Code clarity | ✓ Natural for hierarchical data | More verbose, explicit stack |
| Depth limit | ✗ Limited by Python (1000 default) | ✓ No inherent limit |
| Memory usage | ✗ Call stack overhead | ✓ Explicit stack, more control |
| Debugging | ✗ Harder to trace call stack | ✓ Easier to inspect state |
| Performance | Similar (slight overhead from calls) | Similar (slight overhead from stack ops) |
| Best for | Trees, JSON, XML, moderate depth | Very deep structures, graphs |
When to Choose Recursion for Data Processing
Choose recursion when: - Data structure is naturally hierarchical (trees, JSON, XML) - Maximum depth is reasonable (< 100 levels typically) - Code clarity is more important than maximum performance - You need to track path information naturally
Choose iteration when: - Data might be arbitrarily deep - Working with graphs (need explicit visited tracking anyway) - Memory is extremely constrained - Debugging is critical (easier to inspect explicit stack)
Lessons Learned
- Start with the simplest case: Begin with basic recursion, add complexity only when needed
- Type checking is essential: Real-world data has mixed types—handle them explicitly
- Error handling is not optional: File systems and external data will fail—plan for it
- Path tracking enables debugging: Knowing where data came from is invaluable
- Accumulator pattern is powerful: Pass state down, aggregate results up
- Test with real data: Synthetic examples miss edge cases that real data exposes
Complexity Analysis: Final Implementation
Time Complexity: O(n) where n = total values in nested structure - Each value visited exactly once - Type checking: O(1) per value - Path building: O(d) per value where d = depth (amortized O(1)) - Overall: O(n)
Space Complexity: O(d + n) where d = max depth, n = total values - Call stack: O(d) for maximum nesting depth - Results storage: O(n) for paths and values - Stats dictionary: O(k) where k = number of unique types (constant) - Overall: O(d + n)
Recurrence Relation: T(n) = Σ T(children) + O(1) - For dict with k keys: T(n) = T(n₁) + T(n₂) + ... + T(nₖ) + O(1) - For list with m items: T(n) = T(n₁) + T(n₂) + ... + T(nₘ) + O(1) - Solves to O(n) total work
Real-World Applications
This pattern and these techniques are used in:
- Configuration Management
- Kubernetes config validation
- Terraform state analysis
-
Application config auditing
-
Data Processing Pipelines
- ETL transformations
- Data validation
-
Schema migration
-
Security Tools
- Secret scanning
- Compliance checking
-
Vulnerability assessment
-
Development Tools
- Linters and formatters
- Documentation generators
-
Code analysis tools
-
System Administration
- File system analysis
- Log aggregation
- Resource monitoring
The recursive approach to data processing is not just an academic exercise—it's a fundamental technique in production systems that handle hierarchical data every day.